package com.hualala.datasink.listener;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Title: <br>
 * Description: <br>
 * Copyright: Copyright (c) 2024/5/31<br>
 * Company: 磨刀石<br>
 *
 * @author chenglin
 * @codeGenerator idea
 */
@Component
@Slf4j
public class KafkaConsumer {

    private static final Map<String, StringBuffer> columnMap = new ConcurrentHashMap<String, StringBuffer>(16);

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @KafkaListener(topics = "maxwell",containerFactory = "batchFactory")
    public void onMessage(List<ConsumerRecord<String,String>> consumerRecordList,Acknowledgment acknowledgment) throws SQLException {
        log.info("listen kafka.maxwell fetch data size [{}]",consumerRecordList.size());
        for (ConsumerRecord<String, String> record : consumerRecordList) {
            log.info("receive maxwell message:{}", record.value());
            String key = record.key();
            String value = record.value();
            // 解析JSON格式的Kafka消息
            JSONObject valueJson = JSON.parseObject(value);
            String database = valueJson.getString("database");
            String table = valueJson.getString("table");
            String type = valueJson.getString("type");
            JSONObject data = valueJson.getJSONObject("data");

            Connection connection = Objects.requireNonNull(jdbcTemplate.getDataSource()).getConnection();
            switch (type) {
                case "insert":
                    buildInsertStatement(connection, database, table, data);
                    break;
                case "bootstrap-insert":
                    buildInsertStatement(connection, database, table, data);
                    break;
                case "update":
                    buildUpdateStatement(connection, database, table, data);
                    break;
                case "delete":
                    buildDeleteStatement(connection, database, table, data);
                    break;
                default:
                    // 不支持的操作类型，忽略
                    break;
            }
        }

        acknowledgment.acknowledge();
    }

    // 根据表的结构构建INSERT语句
    private static void buildInsertStatement(Connection connection, String database, String table, JSONObject data) throws SQLException {
        StringBuffer columns = new StringBuffer();
        StringBuilder values = new StringBuilder();
        List<Object> fieldValues = new ArrayList<>();
        PreparedStatement stmt = null;
        try {
            // 获取表的字段信息
            columns  = extracted(connection, database, table, data, columns, values, fieldValues);
            String sql = "INSERT IGNORE  INTO `" + database + "`.`" + table + "` (" + columns.toString() + ") VALUES (" + values.toString() + ")";
            stmt  = connection.prepareStatement(sql);
            // 为占位符设置值
            for (int i = 0; i < fieldValues.size(); i++) {
                stmt.setObject(i + 1, fieldValues.get(i));
            }

        } catch (SQLException e) {
            e.printStackTrace();
        }
        if (stmt != null) {
            try {
                stmt.executeUpdate();
            } catch (SQLException e) {
                e.printStackTrace();
            }finally {
                stmt.close();
                connection.close();;
            }
        }
    }


    private static StringBuffer extracted(Connection connection, String database, String table, JSONObject data, StringBuffer columns, StringBuilder values, List<Object> fieldValues) throws SQLException {
        if (StringUtils.isEmpty(database) || StringUtils.isEmpty(table)) {
            throw new IllegalArgumentException("Database and table names must not be blank.");
        }

        String key = database + table;
        StringBuffer cachedColumns = columnMap.get(key);
        if (cachedColumns != null) {
            log.info("Columns fetched from columnMap for database: {}, table: {}", database, table);
            String[] columnNameList = cachedColumns.toString().split("\\,");
            for (String columnName : columnNameList){
                Object fieldValue = data.get(columnName.trim());
                if (fieldValue != null) {
                    columns.append("").append(columnName).append(", ");
                    values.append("?, ");
                    fieldValues.add(fieldValue);
                }
            }

            values.delete(values.length() - 2, values.length());
            return cachedColumns;
        }

        // 动态构建SQL语句的字段部分和值部分
        try (PreparedStatement metadataStmt = connection.prepareStatement(
                "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?")) {
            metadataStmt.setString(1, database);
            metadataStmt.setString(2, table);
            ResultSet resultSet = metadataStmt.executeQuery();
            while (resultSet.next()) {
                String columnName = resultSet.getString("COLUMN_NAME");
                Object fieldValue = data.get(columnName);
                if (fieldValue != null) {
                    columns.append("").append(columnName).append(", ");
                    values.append("?, ");
                    fieldValues.add(fieldValue);
                }
            }
        }
        columns.delete(columns.length() - 2, columns.length());
        values.delete(values.length() - 2, values.length());
        columnMap.put(key, columns);
        log.info("Columns fetched and stored in columnMap for database: {}, table: {},columns  {} ,values {}", database, table,columns,values);
        return columns;
    }
    // 根据表的结构构建UPDATE语句
    private static void buildUpdateStatement(Connection connection, String database, String table, JSONObject data) throws SQLException {
        StringBuilder updateSet = new StringBuilder();
        List<Object> fieldValues = new ArrayList<>();
        PreparedStatement stmt  = null;
        try {
            // 获取表的字段信息
            PreparedStatement metadataStmt = connection.prepareStatement("SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?");
            metadataStmt.setString(1, database);
            metadataStmt.setString(2, table);
            ResultSet resultSet = metadataStmt.executeQuery();

            while (resultSet.next()) {
                String columnName = resultSet.getString("COLUMN_NAME");
                Object fieldValue = data.get(columnName);

                if (fieldValue != null && !columnName.equalsIgnoreCase("id")) {
                    // 构造UPDATE语句的SET部分，排除id字段
                    updateSet.append("`").append(columnName).append("` = ?, ");
                    fieldValues.add(fieldValue);
                }
            }

            updateSet.delete(updateSet.length() - 2, updateSet.length());

            // 获取id字段的值
            Object idValue = data.get("id");

            String sql = "UPDATE `" + database + "`.`" + table + "` SET " + updateSet.toString() + " WHERE `id` = ?";

            log.info("generator sql {}",sql);
            stmt = connection.prepareStatement(sql);

            // 为占位符设置更新值
            for (int i = 0; i < fieldValues.size(); i++) {
                stmt.setObject(i + 1, fieldValues.get(i));
            }
            // 设置id字段的值
            stmt.setObject(fieldValues.size() + 1, idValue);
        } catch (SQLException e) {
            e.printStackTrace();
        }

        if (stmt != null) {
            try {
                stmt.executeUpdate();
            } catch (SQLException e) {
                e.printStackTrace();
            }finally {
                stmt.close();
                connection.close();;
            }
        }
    }

    // 根据表的结构构建DELETE语句
    private static void buildDeleteStatement(Connection connection, String database, String table, JSONObject data) throws SQLException {
        PreparedStatement stmt = null;
        try {

            // 获取id字段的值
            Object idValue = data.get("id");

            String sql = "DELETE FROM `" + database + "`.`" + table + "` WHERE `id` = ?";
            stmt = connection.prepareStatement(sql);

            // 设置id字段的值
            stmt.setObject(1, idValue);

        } catch (SQLException e) {
            e.printStackTrace();
        }

        if (stmt != null) {
            try {
                stmt.executeUpdate();
            } catch (SQLException e) {
                e.printStackTrace();
            }finally {
                stmt.close();
                connection.close();;
            }
        }
    }




}
